{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# HELK: Checking Spark and Graphframe Integrations\n",
    "----------------------------------------------------------------------------\n",
    "## Goals:\n",
    "* Test if Jupyter can talk to Spark & Graphframes\n",
    "* Test if Spark & Graphframes can pull data from ES\n",
    "* Show the basics of the HELK integrations with advanced analytics libraries"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Check the Spark Context via the variable sc "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "\n",
       "            <div>\n",
       "                <p><b>SparkSession - hive</b></p>\n",
       "                \n",
       "        <div>\n",
       "            <p><b>SparkContext</b></p>\n",
       "\n",
       "            <p><a href=\"http://729edcf1ff97:4040\">Spark UI</a></p>\n",
       "\n",
       "            <dl>\n",
       "              <dt>Version</dt>\n",
       "                <dd><code>v2.3.0</code></dd>\n",
       "              <dt>Master</dt>\n",
       "                <dd><code>spark://helk-spark-master:7077</code></dd>\n",
       "              <dt>AppName</dt>\n",
       "                <dd><code>PySparkShell</code></dd>\n",
       "            </dl>\n",
       "        </div>\n",
       "        \n",
       "            </div>\n",
       "        "
      ],
      "text/plain": [
       "<pyspark.sql.session.SparkSession at 0x7f5c04ca0278>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Spark RDD from Elasticsearch Index (logs-endpoint-winevent-sysmon-*)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "('611341183',\n",
       " {'@timestamp': '2018-05-02T21:57:12.196Z',\n",
       "  '@version': '1',\n",
       "  'action': 'networkconnect',\n",
       "  'beat': {'hostname': 'DESKTOP-WARDOG',\n",
       "   'name': 'DESKTOP-WARDOG',\n",
       "   'version': '6.1.2'},\n",
       "  'event_id': 3,\n",
       "  'geoip': {},\n",
       "  'host_dst_name': 'DESKTOP-WARDOG',\n",
       "  'host_name': 'DESKTOP-WARDOG',\n",
       "  'host_src_name': 'DESKTOP-WARDOG',\n",
       "  'ip_dst': '0:0:0:0:0:0:0:1',\n",
       "  'ip_dst_isipv6': 'true',\n",
       "  'ip_src': '0:0:0:0:0:0:0:1',\n",
       "  'ip_src_isipv6': 'true',\n",
       "  'level': 'Information',\n",
       "  'log_name': 'Microsoft-Windows-Sysmon/Operational',\n",
       "  'network_initiated': True,\n",
       "  'network_protocol': 'udp',\n",
       "  'opcode': 'Info',\n",
       "  'port_dst_number': 50917,\n",
       "  'port_src_number': 50917,\n",
       "  'process_guid': 'A98268C1-9E9A-5AE6-0000-00102B6B1300',\n",
       "  'process_id': 3880,\n",
       "  'process_name': 'svchost.exe',\n",
       "  'process_path': 'C:\\\\Windows\\\\System32\\\\svchost.exe',\n",
       "  'provider_guid': '5770385F-C22A-43E0-BF4C-06F5698FFBD9',\n",
       "  'record_number': '13712475',\n",
       "  'source_name': 'Microsoft-Windows-Sysmon',\n",
       "  'tags': ('_geoip_lookup_failure',),\n",
       "  'task': 'Network connection detected (rule: NetworkConnect)',\n",
       "  'thread_id': 2932,\n",
       "  'type': 'wineventlog',\n",
       "  'user_domain': 'NT AUTHORITY',\n",
       "  'user_name': 'LOCAL SERVICE',\n",
       "  'user_reporter_domain': 'NT AUTHORITY',\n",
       "  'user_reporter_name': 'SYSTEM',\n",
       "  'user_reporter_sid': 'S-1-5-18',\n",
       "  'user_reporter_type': 'User',\n",
       "  'version': 5})"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "es_rdd = sc.newAPIHadoopRDD(\n",
    "    inputFormatClass=\"org.elasticsearch.hadoop.mr.EsInputFormat\",\n",
    "    keyClass=\"org.apache.hadoop.io.NullWritable\",\n",
    "    valueClass=\"org.elasticsearch.hadoop.mr.LinkedMapWritable\",\n",
    "    conf={\n",
    "        \"es.resource\" : \"logs-endpoint-winevent-sysmon-*/doc\",\n",
    "        \"es.nodes\" : \"helk-elasticsearch\"\n",
    "    })\n",
    "es_rdd.first()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Spark RDD from Elasticsearch Index (logs-endpoint-winevent-security-*)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "('972328025',\n",
       " {'@timestamp': '2018-05-03T07:03:11.309Z',\n",
       "  '@version': '1',\n",
       "  'beat': {'hostname': 'DESKTOP-WARDOG',\n",
       "   'name': 'DESKTOP-WARDOG',\n",
       "   'version': '6.1.2'},\n",
       "  'event_data': {},\n",
       "  'event_id': 4689,\n",
       "  'host_name': 'DESKTOP-WARDOG',\n",
       "  'keywords': ('Audit Success',),\n",
       "  'level': 'Information',\n",
       "  'log_name': 'Security',\n",
       "  'message': 'A process has exited.\\n\\nSubject:\\n\\tSecurity ID:\\t\\tS-1-5-20\\n\\tAccount Name:\\t\\tDESKTOP-WARDOG$\\n\\tAccount Domain:\\t\\tWORKGROUP\\n\\tLogon ID:\\t\\t0x3E4\\n\\nProcess Information:\\n\\tProcess ID:\\t0x10e0\\n\\tProcess Name:\\tC:\\\\Windows\\\\System32\\\\msdtc.exe\\n\\tExit Status:\\t0x0',\n",
       "  'opcode': 'Info',\n",
       "  'process_id': 4,\n",
       "  'process_name': 'msdtc.exe',\n",
       "  'process_parent_id': 0,\n",
       "  'process_path': 'C:\\\\Windows\\\\System32\\\\msdtc.exe',\n",
       "  'process_status': '0x0',\n",
       "  'provider_guid': '{54849625-5478-4994-A5BA-3E3B0328C30D}',\n",
       "  'record_number': '34554',\n",
       "  'source_name': 'Microsoft-Windows-Security-Auditing',\n",
       "  'task': 'Process Termination',\n",
       "  'thread_id': 4020,\n",
       "  'type': 'wineventlog',\n",
       "  'user_domain': 'WORKGROUP',\n",
       "  'user_logon_id': 996,\n",
       "  'user_name': 'DESKTOP-WARDOG$',\n",
       "  'user_sid': 'S-1-5-20'})"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "es_rdd = sc.newAPIHadoopRDD(\n",
    "    inputFormatClass=\"org.elasticsearch.hadoop.mr.EsInputFormat\",\n",
    "    keyClass=\"org.apache.hadoop.io.NullWritable\",\n",
    "    valueClass=\"org.elasticsearch.hadoop.mr.LinkedMapWritable\",\n",
    "    conf={ \n",
    "        \"es.resource\" : \"logs-endpoint-winevent-security-*/doc\",\n",
    "        \"es.nodes\" : \"helk-elasticsearch\"\n",
    "    })\n",
    "es_rdd.first()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import Graphframes package"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "from graphframes import *"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------+\n",
      "| id|inDegree|\n",
      "+---+--------+\n",
      "|  c|       1|\n",
      "|  b|       2|\n",
      "+---+--------+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "2"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Create a Vertex DataFrame with unique ID column \"id\"\n",
    "v = sqlContext.createDataFrame([\n",
    "  (\"a\", \"Alice\", 34),\n",
    "  (\"b\", \"Bob\", 36),\n",
    "  (\"c\", \"Charlie\", 30),\n",
    "], [\"id\", \"name\", \"age\"])\n",
    "# Create an Edge DataFrame with \"src\" and \"dst\" columns\n",
    "e = sqlContext.createDataFrame([\n",
    "  (\"a\", \"b\", \"friend\"),\n",
    "  (\"b\", \"c\", \"follow\"),\n",
    "  (\"c\", \"b\", \"follow\"),\n",
    "], [\"src\", \"dst\", \"relationship\"])\n",
    "# Create a GraphFrame\n",
    "from graphframes import *\n",
    "g = GraphFrame(v, e)\n",
    "\n",
    "# Query: Get in-degree of each vertex.\n",
    "g.inDegrees.show()\n",
    "\n",
    "# Query: Count the number of \"follow\" connections in the graph.\n",
    "g.edges.filter(\"relationship = 'follow'\").count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a basic SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"HELK\") \\\n",
    "    .config(\"es.read.field.as.array.include\", \"tags\") \\\n",
    "    .config(\"es.nodes\",\"helk-elasticsearch:9200\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Spark SQL Basic Query (logs-endpoint-winevent-security-* as source)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.format(\"org.elasticsearch.spark.sql\").load(\"logs-endpoint-winevent-security-*/doc\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- @date_new_time: timestamp (nullable = true)\n",
      " |-- @date_previous_time: timestamp (nullable = true)\n",
      " |-- @timestamp: timestamp (nullable = true)\n",
      " |-- @version: string (nullable = true)\n",
      " |-- activity_id: string (nullable = true)\n",
      " |-- beat: struct (nullable = true)\n",
      " |    |-- hostname: string (nullable = true)\n",
      " |    |-- name: string (nullable = true)\n",
      " |    |-- version: string (nullable = true)\n",
      " |-- event_data: struct (nullable = true)\n",
      " |    |-- AdvancedOptions: string (nullable = true)\n",
      " |    |-- ConfigAccessPolicy: string (nullable = true)\n",
      " |    |-- DisableIntegrityChecks: string (nullable = true)\n",
      " |    |-- FlightSigning: string (nullable = true)\n",
      " |    |-- HypervisorDebug: string (nullable = true)\n",
      " |    |-- HypervisorLaunchType: string (nullable = true)\n",
      " |    |-- HypervisorLoadOptions: string (nullable = true)\n",
      " |    |-- KernelDebug: string (nullable = true)\n",
      " |    |-- LoadOptions: string (nullable = true)\n",
      " |    |-- PackageName: string (nullable = true)\n",
      " |    |-- RemoteEventLogging: string (nullable = true)\n",
      " |    |-- Status: string (nullable = true)\n",
      " |    |-- SubjectDomainName: string (nullable = true)\n",
      " |    |-- SubjectLogonId: string (nullable = true)\n",
      " |    |-- SubjectUserName: string (nullable = true)\n",
      " |    |-- SubjectUserSid: string (nullable = true)\n",
      " |    |-- TargetUserName: string (nullable = true)\n",
      " |    |-- TestSigning: string (nullable = true)\n",
      " |    |-- VsmLaunchType: string (nullable = true)\n",
      " |    |-- Workstation: string (nullable = true)\n",
      " |-- event_id: long (nullable = true)\n",
      " |-- group_domain_enumerated: string (nullable = true)\n",
      " |-- group_name_enumerated: string (nullable = true)\n",
      " |-- group_sid_enumerated: string (nullable = true)\n",
      " |-- host_name: string (nullable = true)\n",
      " |-- host_src_name: string (nullable = true)\n",
      " |-- impersonation_level: string (nullable = true)\n",
      " |-- ip_src: string (nullable = true)\n",
      " |-- keywords: string (nullable = true)\n",
      " |-- level: string (nullable = true)\n",
      " |-- log_name: string (nullable = true)\n",
      " |-- logon_authentication_package: string (nullable = true)\n",
      " |-- logon_elevated_token: string (nullable = true)\n",
      " |-- logon_key_length: string (nullable = true)\n",
      " |-- logon_package_name: string (nullable = true)\n",
      " |-- logon_process_name: string (nullable = true)\n",
      " |-- logon_restricted_adminmode: string (nullable = true)\n",
      " |-- logon_transmitted_services: string (nullable = true)\n",
      " |-- logon_type: string (nullable = true)\n",
      " |-- logon_virtual_account: string (nullable = true)\n",
      " |-- message: string (nullable = true)\n",
      " |-- opcode: string (nullable = true)\n",
      " |-- port_dst_number: integer (nullable = true)\n",
      " |-- port_src_number: integer (nullable = true)\n",
      " |-- process_id: integer (nullable = true)\n",
      " |-- process_mandatory_level: string (nullable = true)\n",
      " |-- process_name: string (nullable = true)\n",
      " |-- process_parent_id: integer (nullable = true)\n",
      " |-- process_parent_name: string (nullable = true)\n",
      " |-- process_parent_path: string (nullable = true)\n",
      " |-- process_path: string (nullable = true)\n",
      " |-- process_status: string (nullable = true)\n",
      " |-- process_target_id: integer (nullable = true)\n",
      " |-- process_token_elevation_type: string (nullable = true)\n",
      " |-- provider_guid: string (nullable = true)\n",
      " |-- record_number: string (nullable = true)\n",
      " |-- reporter_logon_id: string (nullable = true)\n",
      " |-- service_host_info: string (nullable = true)\n",
      " |-- service_host_name: string (nullable = true)\n",
      " |-- source_name: string (nullable = true)\n",
      " |-- tags: array (nullable = true)\n",
      " |    |-- element: string (containsNull = true)\n",
      " |-- task: string (nullable = true)\n",
      " |-- thread_id: long (nullable = true)\n",
      " |-- type: string (nullable = true)\n",
      " |-- user_domain: string (nullable = true)\n",
      " |-- user_domain_enumerated: string (nullable = true)\n",
      " |-- user_explicit_domain: string (nullable = true)\n",
      " |-- user_explicit_logon_guid: string (nullable = true)\n",
      " |-- user_explicit_name: string (nullable = true)\n",
      " |-- user_logon_guid: string (nullable = true)\n",
      " |-- user_logon_id: long (nullable = true)\n",
      " |-- user_logon_linkedid: string (nullable = true)\n",
      " |-- user_name: string (nullable = true)\n",
      " |-- user_name_enumerated: string (nullable = true)\n",
      " |-- user_networkaccount_domain: string (nullable = true)\n",
      " |-- user_networkaccount_name: string (nullable = true)\n",
      " |-- user_principal_domain: string (nullable = true)\n",
      " |-- user_principal_id: string (nullable = true)\n",
      " |-- user_principal_name: string (nullable = true)\n",
      " |-- user_principal_sid: string (nullable = true)\n",
      " |-- user_reporter_domain: string (nullable = true)\n",
      " |-- user_reporter_name: string (nullable = true)\n",
      " |-- user_reporter_sid: string (nullable = true)\n",
      " |-- user_sid: string (nullable = true)\n",
      " |-- user_sid_enumerated: string (nullable = true)\n",
      " |-- version: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+\n",
      "|                task|\n",
      "+--------------------+\n",
      "|    Process Creation|\n",
      "| Process Termination|\n",
      "|Other Policy Chan...|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|               Logon|\n",
      "|               Logon|\n",
      "|               Logon|\n",
      "|               Logon|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "|    Process Creation|\n",
      "+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(\"task\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
